/**
 * Copyright (c) 2018, 西安星沙网络科技-版权所有
 *
 * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.gnu.org/licenses/lgpl-3.0.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.waleychain.exchange.service.impl.trade;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import com.alibaba.fastjson.JSONObject;
import com.jsuportframework.core.thread.JSMultitaskException;
import com.jsuportframework.core.thread.executer.JSExecuteTask;
import com.jsuportframework.util.JSUtils;
import com.jsuportframework.util.date.JSDate;

import cn.waleychain.exchange.core.Global;
import cn.waleychain.exchange.core.SysParaKey;
import cn.waleychain.exchange.core.cache.CacheConts;
import cn.waleychain.exchange.core.constant.DDIC;
import cn.waleychain.exchange.core.constant.KLineDataType;
import cn.waleychain.exchange.core.constant.WSConstant;
import cn.waleychain.exchange.core.entity.KLineUser;
import cn.waleychain.exchange.core.entity.MarketResponseDTO;
import cn.waleychain.exchange.core.entity.TradeDataDesc;
import cn.waleychain.exchange.core.entity.TradeNewest;
import cn.waleychain.exchange.core.entity.TradeTurnoverDetails;
import cn.waleychain.exchange.core.result.RetResultCode;
import cn.waleychain.exchange.core.utils.BigDecimalUtils;
import cn.waleychain.exchange.core.utils.RateUtils;
import cn.waleychain.exchange.core.vaildate.VaildateHelper;
import cn.waleychain.exchange.dao.DealOrderMapper;
import cn.waleychain.exchange.dao.TradeDataMapper;
import cn.waleychain.exchange.feign.ConfigServiceFeign;
import cn.waleychain.exchange.model.Market;
import cn.waleychain.exchange.model.TradeData;
import cn.waleychain.exchange.service.impl.BaseServiceImpl;
import cn.waleychain.exchange.service.market.CoinService;
import cn.waleychain.exchange.service.market.MarketService;
import cn.waleychain.exchange.service.trade.CacheDataService;
import cn.waleychain.exchange.service.trade.TradeDataService;

@Service
public class TradeDataServiceImpl extends BaseServiceImpl implements TradeDataService {

	@Autowired
	private MarketService marketService;
	
	@Autowired
	private ConfigServiceFeign configFeign;
	
	@Autowired
	private TradeDataMapper tradeDataMapper; 
	
	@Autowired
	private DealOrderMapper dealOrderMapper;
	
	@Autowired
	private CacheDataService cacheDataService;
	
	@Autowired
	private CoinService coinService;
	
	@Override
	public void syncGetData() throws Exception {
		List<Market> marketList = marketService.fetchCacheMarketVaildList();
		if (marketList != null && marketList.size() <= 0) {
			for (Market market : marketList) {
				ThreadPoolUtils.getInstance().getDataPutGroup().getQueue().addTask(this.getDataGetTask(market.getMarketId(), KLineDataType.ONE_SECONDS_LIST));
				ThreadPoolUtils.getInstance().getDataPutGroup().getQueue().addTask(this.getDataGetTask(market.getMarketId(), KLineDataType.ONE_MINUTES_LIST));
				ThreadPoolUtils.getInstance().getDataPutGroup().getQueue().addTask(this.getDataGetTask(market.getMarketId(), KLineDataType.FIVE_MINUTES_LIST));
				ThreadPoolUtils.getInstance().getDataPutGroup().getQueue().addTask(this.getDataGetTask(market.getMarketId(), KLineDataType.FIFTEEN_MINUTES_LIST));
				ThreadPoolUtils.getInstance().getDataPutGroup().getQueue().addTask(this.getDataGetTask(market.getMarketId(), KLineDataType.THIRTY_MINUTES_LIST));
				ThreadPoolUtils.getInstance().getDataPutGroup().getQueue().addTask(this.getDataGetTask(market.getMarketId(), KLineDataType.ONE_HOURS_LIST));
				ThreadPoolUtils.getInstance().getDataPutGroup().getQueue().addTask(this.getDataGetTask(market.getMarketId(), KLineDataType.ONE_DAY_LIST));
				ThreadPoolUtils.getInstance().getDataPutGroup().getQueue().addTask(this.getDataGetTask(market.getMarketId(), KLineDataType.SEVEN_DAY_LIST));
			}
		}
	}

	@Override
	public void syncPutData() throws Exception {
		
		int syncStatus = Integer.parseInt(configFeign.fetchSystemConfigValue(SysParaKey.TRADE_CONFIG_DATA_SYNC_STATUS));
		if (DDIC.Boolean.BOOL_TRUE_1.id == syncStatus) {
			
			List<Market> marketList = marketService.fetchCacheMarketVaildList();
			if (marketList == null || marketList.size() <= 0) {
				return;
			}
			
			for (Market market : marketList) {
				Map<String, Object> param = new HashMap<>();
				param.put("marketId", market.getMarketId());
				param.put("date", new Date());
				param.put("dealOrderMapper", this.dealOrderMapper);
				param.put("tradeDataService", this);
				param.put("tradeDataMapper", this.tradeDataMapper);
				TradeDataServiceImpl.TradeDataPutTask task = new TradeDataServiceImpl.TradeDataPutTask("dataPutTask", param);
				ThreadPoolUtils.getInstance().getDataPutGroup().getQueue().addTask(task);
			}
			
		}
	}
	
	private TradeDataServiceImpl.TradeDataGetTask getDataGetTask(long marketId, KLineDataType dataType) {
		Map<String, Object> param = new HashMap<>();
		param.put("marketId", marketId);
		param.put("dataType", dataType);
		param.put("tradeDataService", this);
		
		return new TradeDataServiceImpl.TradeDataGetTask("dataGetTask", param);
	}

	private static class TradeDataGetTask extends JSExecuteTask {
		private static final long serialVersionUID = 7038777663769665940L;
		public TradeDataGetTask(String taskName, Map<String, Object> paramMap) {
			super(taskName, paramMap);
		}
		
		public Object execute(Map<String, Object> param) throws JSMultitaskException {
			if (param != null) {
				KLineDataType dataType = (KLineDataType) param.get("dataType");
				long marketId = ((Long)param.get("marketId")).longValue();
				TradeDataService tradeDataService = (TradeDataService)param.get("tradeDataService");
				tradeDataService.resetDataList(marketId, dataType);
			}
			
			return null;
		}
	}

	private static class TradeDataPutTask extends JSExecuteTask {
		
		private static final long serialVersionUID = -4949705501595208023L;
		
		public TradeDataPutTask(String taskName, Map<String, Object> paramMap) {
			super(taskName, paramMap);
		}
		
		public Object execute(Map<String, Object> param) throws JSMultitaskException {
			
			DealOrderMapper dealOrderMapper = (DealOrderMapper)param.get("dealOrderMapper");
			TradeDataService tradeDataService = (TradeDataService)param.get("tradeDataService");
			long marketId = ((Long)param.get("marketId")).longValue();
			Date date = (Date)param.get("date");
			TradeDataMapper tradeDataMapper = (TradeDataMapper)param.get("tradeDataMapper");
			// 每秒钟获取一次成交数据
			TradeTurnoverDetails price = dealOrderMapper.fetchSecondsTurnoverPrice(marketId, date);
			// 平均成交价
			BigDecimal tradePrice = BigDecimalUtils.getRoundAmount(price.getTurnoverAveragePrice(), Global.RETAIN_DECIMAL_LEN);
			TradeData data = new TradeData();
			data.setMarketId(marketId);    						// 交易市场ID
			data.setCreateTime(date);      						// 创建时间，带时分秒
			data.setTradeNum(price.getTurnoverTotalNum());		// 交易总量
			data.setTradePrice(tradePrice);						// 平均价
			tradeDataService.addData(data);
			if (tradePrice != null && tradePrice.doubleValue() > 0.0D) {
				tradeDataMapper.insertSelective(data);
			}
			
			return null;
		}
	}
	
	public void resetDataList(long marketId, KLineDataType dataType) {
		int limit = 1000;
		
		Map<String, List<String>> kline = new HashMap<>();
		List<String> dataList = null;
		List<TradeDataDesc> data = this.getData(marketId, dataType, 1, limit);
		if (data != null && data.size() > 0) {
			// 原本清空dataList在if条件外面，导致首页数据过了凌晨后就被清空
			dataList = new ArrayList<>();
			for (TradeDataDesc desc : data) {
				StringBuffer s = new StringBuffer();
				s.append(JSDate.formatDateTime(desc.getStatisticsTime())).append(",");
				s.append(desc.getOpenPrice().toPlainString()).append(",");
				s.append(desc.getClosePrice().toPlainString()).append(",");
				s.append(desc.getMaxPrice().toPlainString()).append(",");
				s.append(desc.getMinPrice().toPlainString()).append(",");
				s.append(desc.getTotalNum().toPlainString());
				
				dataList.add(s.toString());
			}
			
			kline.put(dataType.getKey(), dataList);
		}
		
		// 然后在放入缓存
		try {
			cacheDataService.setKLines2Cache(marketId, kline);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public void addData(TradeData tradeData) {
		
		if (tradeData == null) {
            throw new NullPointerException("data can not be null!");
        } else if (tradeData.getMarketId() <= 0L) {
            throw new IllegalArgumentException("Illegal market id: " + tradeData.getMarketId());
        } else {
        	for (KLineDataType kline : KLineDataType.values()) {
        		addData(tradeData, kline);
        	}
        }
	}
	
	public void addData(TradeData tradeData, KLineDataType dataType) {
		
		if (tradeData != null && dataType != null) {
			// 数据list key
            String dataKey = this.genKey(tradeData.getMarketId(), dataType);
            // list 大小
            long length = this.redisService.llen(CacheConts.CACHE_DATA_LIST, dataKey);
            // 创建时间
            Date currentTime = tradeData.getCreateTime();
            // 统计时间
            Date statisticsTime = TradeUtils.getStatisticsDate(dataType.getKey(), currentTime);
            // 获取最新
            List<String> elements = this.redisService.lrange(CacheConts.CACHE_DATA_LIST, dataKey, 0L, 0L);
            TradeDataDesc descObject = new TradeDataDesc(statisticsTime);
            if (JSUtils.ifCollectionNotEmpty(elements)) {
                descObject = new TradeDataDesc(elements.get(0));
            }
            if (currentTime.after(descObject.getStatisticsTime())) {
                if (descObject.getOpenPrice().doubleValue() > 0.0D
                        && descObject.getClosePrice().doubleValue() > 0.0D
                        && descObject.getMaxPrice().doubleValue() > 0.0D
                        && descObject.getMinPrice().doubleValue() > 0.0D
                        && descObject.getTotalNum().doubleValue() > 0.0D) {
                    if (length >= 10000L) {
                        // 删除最后一个元素
                    	this.redisService.rpop(CacheConts.CACHE_DATA_LIST, dataKey);
                    }
                    descObject = new TradeDataDesc(statisticsTime);
                    this.redisService.lpush(CacheConts.CACHE_DATA_LIST, dataKey, descObject.toString());
                } else {
                    descObject = new TradeDataDesc(statisticsTime);
                    this.redisService.lpop(CacheConts.CACHE_DATA_LIST, dataKey);
                    this.redisService.lpush(CacheConts.CACHE_DATA_LIST, dataKey, descObject.toString());
                }
            }

            if (tradeData.getTradePrice() != null && tradeData.getTradePrice().doubleValue() > 0.0D) {
                if (descObject.getMaxPrice() == null
                        || descObject.getMaxPrice().max(tradeData.getTradePrice()) == tradeData.getTradePrice()) {
                    descObject.setMaxPrice(tradeData.getTradePrice());
                }
                if (descObject.getMinPrice() == null
                        || descObject.getMinPrice().doubleValue() <= 0.0D
                        || descObject.getMinPrice().min(tradeData.getTradePrice()) == tradeData.getTradePrice()) {
                    descObject.setMinPrice(tradeData.getTradePrice());
                }
                if (descObject.getOpenPrice() == null
                        || descObject.getOpenPrice().doubleValue() <= 0.0D) {
                    descObject.setOpenPrice(tradeData.getTradePrice());
                }
                if (descObject.getTotalNum() == null) {
                    descObject.setTotalNum(tradeData.getTradeNum());
                } else {
                    descObject.setTotalNum(descObject.getTotalNum().add(tradeData.getTradeNum()));
                }
                descObject.setClosePrice(tradeData.getTradePrice());
                if (length <= 0L) {
                	this.redisService.lpush(CacheConts.CACHE_DATA_LIST, dataKey, descObject.toString());
                } else {
                	this.redisService.lset(CacheConts.CACHE_DATA_LIST, dataKey, descObject.toString());
                }
            }
		}
	}

	@Override
	public List<TradeDataDesc> getData(long marketId, KLineDataType dataType, int start, int limit) {
		
		if (start <= 0) {
            start = 1;
        }
        if (limit <= 0) {
            limit = 1;
        }
		
        List<TradeDataDesc> list = new ArrayList<>();
        
        String key = genKey(marketId, dataType);
        
        List<String> data = this.redisService.lrange(CacheConts.CACHE_DATA_LIST, key, (long)(start - 1), (long)(start + limit - 2));
        if (data != null && data.size() > 0) {
            for (String s : data) {
            	list.add(new TradeDataDesc(s));
            }
        }
        
		return list;
	}

	private String genKey(long marketId, KLineDataType dataType) {
		String key = marketId + "_" + dataType.getKey();
		return key;
	}
	
	@Override
	public String getDataDesc(long marketId, KLineDataType dataType) throws Exception {
		
		if (dataType == null) {
			return null;
		} else {
			Map<String, List<String>> map = cacheDataService.getKLines2Cache(marketId);
			if (map != null) {
				List<String> rs = map.get(dataType.getKey());
				if (JSUtils.ifCollectionNotEmpty(rs)) {
					for (String str : rs) {
						if (!str.endsWith("0,0,0,0,0")) {
							return str;
						}
					}
				}
			}
			
			return null;
		}
	}
	
	public List<TradeNewest> getNewData(long marketId, KLineDataType dataType, int limit) throws Exception {
		
		if (dataType == null) {
			return null;
		}
		
		Map<String, List<String>> map = cacheDataService.getKLines2Cache(marketId);
		if (map != null) {
			List<String> rs = map.get(dataType);
			if (JSUtils.ifCollectionNotEmpty(rs)) {
				List<String> descList = rs.subList(0, Math.min(rs.size(), limit));
				if (JSUtils.ifCollectionNotEmpty(descList)) {
					List<TradeNewest> newestList = new ArrayList<>();
					for (String desc : descList) {
						newestList.add(this.getNewest(desc));
					}
					return newestList;
				}
			}
		}
		
		return null;
	}
	
	@Override
	public TradeNewest getNewest(String tradeDesc) {
		
		TradeNewest newest = new TradeNewest();
		if (StringUtils.isEmpty(tradeDesc)) {
			return newest;
		}
		
		String[] args = tradeDesc.split(",");
		if (JSUtils.ifArrayNotEmpty(args) && args.length == 6) {
			newest.setTotalNum(JSUtils.ifStringEmpty(args[5]) ? new BigDecimal(0) : new BigDecimal(args[5]));
			newest.setOpenPrice(JSUtils.ifStringEmpty(args[1]) ? new BigDecimal(0) : new BigDecimal(args[1]));
			newest.setClosePrice(JSUtils.ifStringEmpty(args[2]) ? new BigDecimal(0) : new BigDecimal(args[2]));
			newest.setMaxPrice(JSUtils.ifStringEmpty(args[3]) ? new BigDecimal(0) : new BigDecimal(args[3]));
			newest.setMinPrice(JSUtils.ifStringEmpty(args[4]) ? new BigDecimal(0) : new BigDecimal(args[4]));
			newest.setCurrentPrice(newest.getClosePrice());
		}
		
		return newest;
	}

	@Override
	public KLineUser resetKLineUser(String sessionId, Long marketId, KLineDataType dataType, int kLimit) throws Exception {
		
		VaildateHelper.vaildateBooleanResult(dataType == null, RetResultCode.E11001, "数据类型dataType错误");
		
		KLineUser kuser = new KLineUser();
		kuser.setSessionId(sessionId);
		kuser.setMarketId(marketId);
		kuser.setDataType(dataType.getKey());
		kuser.setkLimit(kLimit);
		kuser.setListenerPath(WSConstant.p2pKlineListenerPath);
		
		this.redisService.hset(CacheConts.CACHE_STATIC_KLINE_USER_LIST, sessionId, kuser);
		this.redisService.expire(CacheConts.CACHE_STATIC_KLINE_USER_LIST, sessionId, 2L, TimeUnit.HOURS);
				
		return kuser;
	}

	@Override
	public List<KLineUser> fetchKLineUserList() throws Exception {

		Set<String> set = this.redisService.hkey(CacheConts.CACHE_STATIC_KLINE_USER_LIST);
		if (set == null || set.isEmpty()) {
			return null;
		}
		
		List<KLineUser> list = new ArrayList<>();
		for (String key : set) {
			String res = this.redisService.hget(CacheConts.CACHE_STATIC_KLINE_USER_LIST, key);
			KLineUser kuser = JSONObject.parseObject(res, KLineUser.class);
			list.add(kuser);
		}
		
		return list;
	}

	@Override
	public List<MarketResponseDTO> fetchTradeMarketByAreaId(String areaId) throws Exception {

		// 获取市场列表
		List<Market> marketList = marketService.fetchCacheMarketVaildList();
		if (marketList == null || marketList.size() <= 0) {
			return null;
		}
		
		List<MarketResponseDTO> returnList = new ArrayList<>();
		String exchangeRate = RateUtils.getUsdRate(); // 兑换费率
		for (Market market : marketList) {
			if (market == null || !market.getTransactionAreaId().equals(areaId)) {
				continue;
			}
			
			// 当前最新价（单位：买币种）
            BigDecimal currentPrice = cacheDataService.getNewPrice2Cache(market.getMarketId());
            List<BigDecimal> priceTrendList = new ArrayList<>(10);
            MarketResponseDTO responseMarket = new MarketResponseDTO();
            responseMarket.setId(market.getMarketId());               // 交易对ID
            responseMarket.setName(market.getName());           // 交易对名称
            responseMarket.setTitle(market.getTitle());         // 交易对标题
            responseMarket.setImage(market.getIconResUrl());           // 交易对图片
            responseMarket.setHighPrice(BigDecimal.ZERO);       // 最高价
            responseMarket.setLowPrice(BigDecimal.ZERO);        // 最低价
            responseMarket.setChange("+0%");                    // 涨跌幅
            responseMarket.setVolume(BigDecimal.ZERO);          // 日总交易量
            responseMarket.setTurnoverAmount(BigDecimal.ZERO);  // 日成交总额
            responseMarket.setPriceTrend(null);                 // 价格走势
			
            // btcx 币种的标识ID
            if (market.getBuyCoinId() - coinService.fetchBtcxCoinId() != 0) {
                // 获取 hht 价格
            	long btcx2hhtMarketId = Long.parseLong(configFeign.fetchSystemConfigValue(SysParaKey.SYS_CONFIG_BTCX_2_HHT_MARKET_ID));
                BigDecimal buyCoinPrice = cacheDataService.getNewPrice2Cache(btcx2hhtMarketId);
                // 人民币价格 = 当前价 * 购买币种的 btcx 价格
                BigDecimal currentPriceCNY = currentPrice.multiply(buyCoinPrice);
                responseMarket.setCurrentPrice(currentPrice);        // 当前价
                responseMarket.setCurrentPriceCNY(currentPriceCNY);  // 当前价CNY
            } else {
                // btcx 交易对，当前价取计算后的USDT
                BigDecimal currentPriceUsd = currentPrice.divide(new BigDecimal(exchangeRate), 4, BigDecimal.ROUND_UP);
                responseMarket.setCurrentPrice(currentPriceUsd);     // 当前价
                responseMarket.setCurrentPriceCNY(currentPrice);     // 当前价CNY
            }
            
         // 默认昨天的收盘价 = 系统配置的开盘价
            BigDecimal yesterdayClosePrice = market.getOpenPrice();
            List<TradeNewest> dataList = this.getNewData(market.getMarketId(), KLineDataType.ONE_DAY_LIST, 2);
            if (!CollectionUtils.isEmpty(dataList)) {
                TradeNewest newest = dataList.get(0);
                responseMarket.setHighPrice(newest.getMaxPrice());       // 最高价
                responseMarket.setLowPrice(newest.getMinPrice());        // 最低价
                responseMarket.setVolume(newest.getTotalNum());          // 日总交易量
                // 日成交总额 = 日成交量 * 人民币价格
                BigDecimal turnoverAmount = newest.getTotalNum().multiply(responseMarket.getCurrentPriceCNY());
                responseMarket.setTurnoverAmount(turnoverAmount);        // 日成交总额
                if (dataList.size() > 1) {
                    // 昨日收盘价
                    yesterdayClosePrice = dataList.get(1).getClosePrice();
                }
                BigDecimal change = BigDecimal.ZERO;
                // 张跌幅 = （当前价 - 前一日的收盘价） / 前一日的收盘价  * 100%；
                if (yesterdayClosePrice.compareTo(BigDecimal.ZERO) == 1) {
                    change = currentPrice.subtract(yesterdayClosePrice);
                    change = change.divide(yesterdayClosePrice, 4, BigDecimal.ROUND_UP).multiply(new BigDecimal(100));
                }
                // 涨跌幅保留两位小数
                change.setScale(2, BigDecimal.ROUND_UP);
                responseMarket.setChange(change.doubleValue() + "%"); // 涨跌幅
                // 价格走势
                priceTrendList.add(yesterdayClosePrice);
                List<TradeNewest> newestList = this.getNewData(market.getMarketId(), KLineDataType.THIRTY_MINUTES_LIST, 9);
                if (!CollectionUtils.isEmpty(newestList)) {
                    for (TradeNewest tradeNewest : newestList) {
                        priceTrendList.add(tradeNewest.getClosePrice());
                    }
                }
            } else {
                for (int i = 0; i < 10; i++) {
                    priceTrendList.add(responseMarket.getCurrentPrice());
                }
            }
            
            responseMarket.setPriceTrend(priceTrendList);
            returnList.add(responseMarket);
		}
		
		return returnList;
	}
}
